from kafka import KafkaConsumer


def consume_messages(topic, group_id):
    # Kafka 服务器配置
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers='10.10.1.80:9092',
        group_id=group_id,
        auto_offset_reset='earliest',  # 从最早的消息开始消费
        enable_auto_commit=True,  # 自动提交偏移量
        value_deserializer=lambda x: x.decode('utf-8')  # 解码消息
    )

    print(f"Listening to topic '{topic}' in group '{group_id}'...")
    try:
        for message in consumer:
            print(f"Received message: {message.value} (partition: {message.partition}, offset: {message.offset})")
    except KeyboardInterrupt:
        print("Consumer stopped.")
    finally:
        consumer.close()


if __name__ == "__main__":
    topic_name = "DRS00001"
    group_id = "drs01"
    consume_messages(topic_name, group_id)
